package dao

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HColumnDescriptor, TableName, HTableDescriptor, HBaseConfiguration}
import org.apache.hadoop.hbase.client.{Connection, Put, Admin, ConnectionFactory}
import util.GlobalArgs

/**
  * Created by chenjianwen on 2016/3/10.
  */
class ResultHBaseDao {

  //创建表
  def createTableWithOverWrite(tableName:String,familyName:List[String]):Unit = {
    val admin =  hbaseAdmin
    val tableNameObj = TableName.valueOf(tableName)
    //删除
    if(admin.tableExists(tableNameObj)){
      if(admin.isTableEnabled(tableNameObj)){
        admin.disableTable(tableNameObj)
      }
        admin.deleteTable(tableNameObj)
      }

    val des = new HTableDescriptor(tableNameObj)
    for(one <- familyName){
     des.addFamily(new HColumnDescriptor(one))
    }
    admin.createTable(des)
    closeConnection()
  }

  //插入数据
  def insertRow[T](row:String,familyName:String,columnName:String,tableName:String,value:String):Unit={
    val admin =  hbaseAdmin
    val tableNameObj = TableName.valueOf(tableName)
    if(!admin.tableExists(tableNameObj)){
      println(s"${tableName} is not exist!")
      return
    }
    val table =  hbaseConnection.getTable(tableNameObj)
    val put = new Put(row.getBytes)
    put.add(familyName.getBytes,columnName.getBytes,value.getBytes)
    table.put(put)
    closeConnection()
  }

  //插入多行数据
  def insertRows(puts:java.util.List[Put],tableName:String):Unit={

    val admin = hbaseAdmin
    val tableNameObj = TableName.valueOf(tableName)
    if(!admin.tableExists(tableNameObj)){
      println(s"${tableName} is not exist!")
      return
    }
    val table = hbaseConnection.getTable(tableNameObj)
    table.put(puts)
    closeConnection()
  }

  def closeConnection(): Unit ={
     hbaseAdmin.close()
     hbaseConnection.close()
  }

  private[this] var conn:Connection = _
  private[this] var admin:Admin = _

  def hbaseAdmin: Admin ={
    if(admin!=null&&(!conn.isClosed)) admin
    this.synchronized {
      admin = hbaseConnection.getAdmin
      admin
    }

  }

  def hbaseConnection:Connection={
    if(conn!=null&&(!conn.isClosed)) conn
    this.synchronized{
      conn = ConnectionFactory.createConnection(ResultHBaseDao.getConf)
      conn
    }
  }


}



/*
   初始化重量级类
 */
object ResultHBaseDao{
  private[this] var conf:Configuration = _
  def  getConf:Configuration={
    if(conf!=null) conf
    this.synchronized {
      conf = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum", GlobalArgs.HABSE_HOST)
      conf.set("hbase.rootdir", GlobalArgs.HBASE_HDFS_DIR)
      conf.set("ipc.socket.timeout", "18000000")
      conf
    }
  }

  def apply():ResultHBaseDao ={
    return ResultHBaseDao()
  }
}
